feat!: Use AgentEmitter instead of EventQueue in AgentExecutor methods#621
feat!: Use AgentEmitter instead of EventQueue in AgentExecutor methods#621kabir wants to merge 3 commits intoa2aproject:mainfrom
Conversation
Summary of ChangesHello @kabir, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly refactors the internal event processing and agent communication mechanisms to improve robustness, consistency, and developer experience. By introducing a dedicated Highlights
Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This is an excellent and significant refactoring that greatly improves the architecture and robustness of the A2A Java SDK. The introduction of the AgentEmitter provides a much cleaner and simpler API for agent developers, abstracting away the complexities of the EventQueue. The new MainEventBus architecture with its "persistence first" approach is a major step forward for data consistency and reliability, especially in replicated environments. The improvements to thread pool management, client disconnect handling, and replication logic are all critical for stability and performance. The test suite has also been made more robust and reliable. The minor suggestion regarding a comment in a test file has been retained as it does not contradict any established rules.
...ore/src/test/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManagerTest.java
Show resolved
Hide resolved
065787e to
8bb5d93
Compare
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a significant and beneficial refactoring by replacing the TaskUpdater with a new AgentEmitter class. This change provides a cleaner and more powerful API for agent developers, encapsulating task lifecycle management and message sending in a single place. The new AgentEmitter is well-documented and includes helpful builders for creating tasks and messages.
Key improvements include:
- A unified
AgentEmitterAPI for all agent-to-client communication. - Simplification of agent implementations, as seen in the updated examples and tests.
- Introduction of
taskIdvalidation in theEventQueuefor improved robustness.
The changes have been applied consistently throughout the codebase. I have one minor suggestion in a test file to improve the logic after a task is failed. Overall, this is an excellent pull request that improves the SDK's design and usability.
tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java
Outdated
Show resolved
Hide resolved
8bb5d93 to
4b2052d
Compare
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a significant and valuable refactoring by replacing TaskUpdater and direct EventQueue access with the new AgentEmitter class. This greatly simplifies the API for agent developers, providing a clear and concise way to manage task lifecycles and send events. The changes are applied consistently across the entire codebase, including documentation, examples, and tests. The addition of task ID validation in the event queue is also a great improvement for correctness.
I have a couple of suggestions for the new AgentEmitter class to further improve its robustness and implementation.
| public void fail(A2AError error) { | ||
| eventQueue.enqueueEvent(error); | ||
| updateStatus(TaskState.FAILED); | ||
| } |
There was a problem hiding this comment.
In the fail(A2AError error) method, the error event is enqueued before the task status is updated to FAILED. If another thread has already moved the task to a terminal state (e.g., COMPLETED), updateStatus will throw an IllegalStateException. However, the error event will have already been sent to the client, leading to an inconsistent state where the client sees an error but the task is marked as completed.
Reversing the order of operations would be safer for maintaining server-side state consistency. First, attempt to transition the task to the FAILED state. If that succeeds, then enqueue the error event for the client.
public void fail(A2AError error) {
updateStatus(TaskState.FAILED);
eventQueue.enqueueEvent(error);
}| public void updateStatus(TaskState state, @Nullable Message message, boolean isFinal) { | ||
| synchronized (stateLock) { | ||
| // Check if we're already in a terminal state | ||
| if (terminalStateReached.get()) { | ||
| throw new IllegalStateException("Cannot update task status - terminal state already reached"); | ||
| } | ||
|
|
||
| // If this is a final state, set the flag | ||
| if (isFinal) { | ||
| terminalStateReached.set(true); | ||
| } | ||
|
|
||
| TaskStatusUpdateEvent event = TaskStatusUpdateEvent.builder() | ||
| .taskId(taskId) | ||
| .contextId(contextId) | ||
| .isFinal(isFinal) | ||
| .status(new TaskStatus(state, message, null)) | ||
| .build(); | ||
| eventQueue.enqueueEvent(event); | ||
| } | ||
| } |
There was a problem hiding this comment.
The updateStatus method uses synchronized (stateLock) to protect against concurrent updates to the terminal state. This can be simplified by using the atomic nature of AtomicBoolean more directly with compareAndSet, which is more idiomatic and performant under contention. This change would also allow for the removal of the stateLock field.
public void updateStatus(TaskState state, @Nullable Message message, boolean isFinal) {
if (isFinal) {
if (!terminalStateReached.compareAndSet(false, true)) {
throw new IllegalStateException("Cannot update task status - terminal state already reached");
}
} else {
if (terminalStateReached.get()) {
throw new IllegalStateException("Cannot update task status - terminal state already reached");
}
}
TaskStatusUpdateEvent event = TaskStatusUpdateEvent.builder()
.taskId(taskId)
.contextId(contextId)
.isFinal(isFinal)
.status(new TaskStatus(state, message, null))
.build();
eventQueue.enqueueEvent(event);
}4b2052d to
7d74837
Compare
BREAKING CHANGE: AgentEmitter contains the methods from the old TaskUpdater and is now the only way agents send results back to the caller. This hides the EventQueue mechanism from users. Also introduced a check that when placing a full Task object on the queue, which should only be done for calls with no existing Task, that the Task's ID is the one expected for the queue, as calculated by the RequestContext. Added Message and Task builders to AgentEmitter to help with using the proper taskID and contextId.
7d74837 to
4d47a01
Compare
BREAKING CHANGE: AgentEmitter contains the methods from the old TaskUpdater
and is now the only way agents send results back to the caller. This hides
the EventQueue mechanism from users.
Also introduced a check that when placing a full Task object on the queue,
which should only be done for calls with no existing Task, that the
Task's ID is the one expected for the queue, as calculated by the
RequestContext.
Added Message and Task builders to AgentEmitter to help with using the
proper taskID and contextId.
Fixes #604 🦕